Skip to content

Conversation

@yuguorui
Copy link

Fixed the Stream[T].Next() method to properly handle data events that don't have an explicit event: field. Previously, when the decoder encountered data without an explicit event type, it would skip processing entirely. Now, when eventType is empty, the decoder attempts to unmarshal the data directly as JSON.

… handling

Fixed the Stream[T].Next() method to properly handle data events that don't
have an explicit event: field. Previously, when the decoder encountered data
without an explicit event type, it would skip processing entirely. Now, when
eventType is empty, the decoder attempts to unmarshal the data directly as
JSON.

Signed-off-by: yuguorui <[email protected]>
Copilot AI review requested due to automatic review settings December 18, 2025 15:53
@yuguorui yuguorui requested a review from a team as a code owner December 18, 2025 15:53
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes the SSE stream decoder to properly handle data-only messages (without explicit event types) and [DONE] sentinel messages. Previously, messages without an event type field were skipped entirely, causing data loss.

Key changes:

  • Added explicit handling for [DONE] messages to gracefully terminate the stream
  • Introduced fallback processing for events without an explicit type field by attempting direct JSON unmarshaling
  • Optimized by caching event type and data to avoid repeated decoder calls
Comments suppressed due to low confidence (1)

packages/ssestream/ssestream.go:190

  • The JSON unmarshaling and assignment logic is duplicated three times. Consider extracting this into a helper method or consolidating the cases to reduce code duplication and improve maintainability.
		if eventType == "" {
			// For Anthropic's SSE format, data events without explicit type should still be processed
			var nxt T
			s.err = json.Unmarshal(eventData, &nxt)
			if s.err != nil {
				return false
			}
			s.cur = nxt
			return true
		}

		switch eventType {
		case "completion":
			var nxt T
			s.err = json.Unmarshal(eventData, &nxt)
			if s.err != nil {
				return false
			}
			s.cur = nxt
			return true
		case "message_start", "message_delta", "message_stop", "content_block_start", "content_block_delta", "content_block_stop":
			var nxt T
			s.err = json.Unmarshal(eventData, &nxt)
			if s.err != nil {
				return false
			}
			s.cur = nxt
			return true

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

eventData := s.decoder.Event().Data

// Check for [DONE] message which indicates end of stream
if string(eventData) == "[DONE]\n" || string(eventData) == "[DONE]" {
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting byte slices to strings for comparison is inefficient. Use bytes.Equal() or bytes.HasPrefix() instead to avoid unnecessary allocations. For example: bytes.Equal(eventData, []byte("[DONE]")) or handle the newline separately.

Suggested change
if string(eventData) == "[DONE]\n" || string(eventData) == "[DONE]" {
if bytes.Equal(eventData, []byte("[DONE]\n")) || bytes.Equal(eventData, []byte("[DONE]")) {

Copilot uses AI. Check for mistakes.
@yuguorui
Copy link
Author

Functional:
13:24:29.942090 ssestream.go:70: bytes: event: message_start
13:24:29.942103 ssestream.go:70: bytes: data: {"type":"message_start","message":{"model":"claude-sonnet-4-5-20250929","id":"msg_bdrk_01GjkcKtyn15hiXkeqfKMkds","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":2}}}

13:24:29.942280 ssestream.go:70: bytes: event: content_block_start
13:24:29.942283 ssestream.go:70: bytes: data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}

13:24:30.178601 ssestream.go:70: bytes: event: content_block_delta
13:24:30.178625 ssestream.go:70: bytes: data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"A **"}}

Problematic (which is missing the event: XXX respoonse):
13:23:41.652593 ssestream.go:70: bytes: data: {"type":"message_start","message":{"model":"claude-opus-4-5-20251101","id":"msg_vrtx_01VYYUR25nt7cHDjKVLCoXzw","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":13,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":5}} }
13:23:41.693849 ssestream.go:70: bytes: data: {"type": "ping"}
13:23:41.913679 ssestream.go:70: bytes: data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} }
13:23:41.913719 ssestream.go:70: bytes: data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"A quaternion"} }
13:23:41.913732 ssestream.go:70: bytes: data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" is a four"} }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant